/*
Copyright 2022 The Karmada Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

    http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package proxy

import (
	"context"
	"net/http"
	"time"

	corev1 "k8s.io/api/core/v1"
	"k8s.io/apimachinery/pkg/api/meta"
	"k8s.io/apimachinery/pkg/labels"
	"k8s.io/apimachinery/pkg/runtime"
	"k8s.io/apimachinery/pkg/runtime/schema"
	"k8s.io/apiserver/pkg/endpoints/handlers/responsewriters"
	"k8s.io/apiserver/pkg/endpoints/metrics"
	"k8s.io/apiserver/pkg/endpoints/request"
	"k8s.io/apiserver/pkg/registry/rest"
	"k8s.io/client-go/dynamic"
	"k8s.io/client-go/informers"
	"k8s.io/client-go/kubernetes/scheme"
	listcorev1 "k8s.io/client-go/listers/core/v1"
	restclient "k8s.io/client-go/rest"
	"k8s.io/client-go/tools/cache"
	"k8s.io/klog/v2"

	clusterv1alpha1 "github.com/karmada-io/karmada/pkg/apis/cluster/v1alpha1"
	informerfactory "github.com/karmada-io/karmada/pkg/generated/informers/externalversions"
	clusterlisters "github.com/karmada-io/karmada/pkg/generated/listers/cluster/v1alpha1"
	searchlisters "github.com/karmada-io/karmada/pkg/generated/listers/search/v1alpha1"
	"github.com/karmada-io/karmada/pkg/search/proxy/framework"
	"github.com/karmada-io/karmada/pkg/search/proxy/framework/plugins"
	pluginruntime "github.com/karmada-io/karmada/pkg/search/proxy/framework/runtime"
	"github.com/karmada-io/karmada/pkg/search/proxy/store"
	"github.com/karmada-io/karmada/pkg/util"
	"github.com/karmada-io/karmada/pkg/util/helper"
	"github.com/karmada-io/karmada/pkg/util/lifted"
	"github.com/karmada-io/karmada/pkg/util/restmapper"
)

const workKey = "key"

// Controller syncs Cluster and GlobalResource.
type Controller struct {
	restMapper           meta.RESTMapper
	negotiatedSerializer runtime.NegotiatedSerializer

	secretLister   listcorev1.SecretLister
	clusterLister  clusterlisters.ClusterLister
	registryLister searchlisters.ResourceRegistryLister
	worker         util.AsyncWorker
	store          store.Store

	proxy framework.Proxy
}

// NewControllerOption is the Option for NewController().
type NewControllerOption struct {
	RestConfig *restclient.Config
	RestMapper meta.RESTMapper

	KubeFactory    informers.SharedInformerFactory
	KarmadaFactory informerfactory.SharedInformerFactory

	MinRequestTimeout time.Duration

	OutOfTreeRegistry pluginruntime.Registry
}

// NewController create a controller for proxy
func NewController(option NewControllerOption) (*Controller, error) {
	secretLister := option.KubeFactory.Core().V1().Secrets().Lister()
	clusterLister := option.KarmadaFactory.Cluster().V1alpha1().Clusters().Lister()

	clientFactory := dynamicClientForClusterFunc(clusterLister, secretLister)
	multiClusterStore := store.NewMultiClusterCache(clientFactory, option.RestMapper)

	allPlugins, err := newPlugins(option, multiClusterStore)
	if err != nil {
		return nil, err
	}

	proxy := pluginruntime.NewFramework(allPlugins)

	ctl := &Controller{
		restMapper:           option.RestMapper,
		negotiatedSerializer: scheme.Codecs.WithoutConversion(),
		secretLister:         secretLister,
		clusterLister:        clusterLister,
		registryLister:       option.KarmadaFactory.Search().V1alpha1().ResourceRegistries().Lister(),
		store:                multiClusterStore,
		proxy:                proxy,
	}

	workerOptions := util.Options{
		Name:          "proxy-controller",
		KeyFunc:       nil,
		ReconcileFunc: ctl.reconcile,
	}
	ctl.worker = util.NewAsyncWorker(workerOptions)

	resourceEventHandler := cache.ResourceEventHandlerFuncs{
		AddFunc: func(interface{}) {
			ctl.worker.Add(workKey)
		},
		UpdateFunc: func(_, _ interface{}) {
			ctl.worker.Add(workKey)
		},
		DeleteFunc: func(interface{}) {
			ctl.worker.Add(workKey)
		},
	}

	_, err = option.KarmadaFactory.Cluster().V1alpha1().Clusters().Informer().AddEventHandler(resourceEventHandler)
	if err != nil {
		klog.Errorf("Failed to add handler for Clusters: %v", err)
		return nil, err
	}
	_, err = option.KarmadaFactory.Search().V1alpha1().ResourceRegistries().Informer().AddEventHandler(resourceEventHandler)
	if err != nil {
		klog.Errorf("Failed to add handler for ResourceRegistries: %v", err)
		return nil, err
	}

	return ctl, nil
}

func newPlugins(option NewControllerOption, clusterStore store.Store) ([]framework.Plugin, error) {
	pluginDependency := pluginruntime.PluginDependency{
		RestConfig:        option.RestConfig,
		RestMapper:        option.RestMapper,
		KubeFactory:       option.KubeFactory,
		KarmadaFactory:    option.KarmadaFactory,
		MinRequestTimeout: option.MinRequestTimeout,
		Store:             clusterStore,
	}

	registry := plugins.NewInTreeRegistry()
	registry.Merge(option.OutOfTreeRegistry)

	allPlugins := make([]framework.Plugin, 0, len(registry))
	for _, pluginFactory := range registry {
		plugin, err := pluginFactory(pluginDependency)
		if err != nil {
			return nil, err
		}

		allPlugins = append(allPlugins, plugin)
	}

	return allPlugins, nil
}

// Start run the proxy controller
func (ctl *Controller) Start(stopCh <-chan struct{}) {
	ctl.worker.Run(1, stopCh)
}

// Stop shutdown cache
func (ctl *Controller) Stop() {
	ctl.store.Stop()
}

// reconcile cache
func (ctl *Controller) reconcile(util.QueueKey) error {
	clusters, err := ctl.clusterLister.List(labels.Everything())
	if err != nil {
		return err
	}

	registries, err := ctl.registryLister.List(labels.Everything())
	if err != nil {
		return err
	}

	resourcesByClusters := make(map[string]map[schema.GroupVersionResource]*store.MultiNamespace)
	for _, registry := range registries {
		matchedResources := make(map[schema.GroupVersionResource]*store.MultiNamespace, len(registry.Spec.ResourceSelectors))
		for _, selector := range registry.Spec.ResourceSelectors {
			gvr, err := restmapper.GetGroupVersionResource(ctl.restMapper, schema.FromAPIVersionAndKind(selector.APIVersion, selector.Kind))
			if err != nil {
				klog.Errorf("Failed to get gvr: %v", err)
				continue
			}

			nsSelector := matchedResources[gvr]
			if nsSelector == nil {
				nsSelector = store.NewMultiNamespace()
				matchedResources[gvr] = nsSelector
			}
			nsSelector.Add(selector.Namespace)
		}

		if len(matchedResources) == 0 {
			continue
		}

		for _, cluster := range clusters {
			if !util.ClusterMatches(cluster, registry.Spec.TargetCluster) {
				continue
			}

			if !util.IsClusterReady(&cluster.Status) {
				klog.Warningf("cluster %s is notReady", cluster.Name)
				continue
			}

			if _, exist := resourcesByClusters[cluster.Name]; !exist {
				resourcesByClusters[cluster.Name] = make(map[schema.GroupVersionResource]*store.MultiNamespace)
			}

			for resource, multiNS := range matchedResources {
				gvk, err := ctl.restMapper.KindFor(resource)
				if err != nil {
					klog.Errorf("Failed to get gvk: %v", err)
					continue
				}
				if !helper.IsAPIEnabled(cluster.Status.APIEnablements, gvk.GroupVersion().String(), gvk.Kind) {
					klog.Warningf("Resource %s is not enabled for cluster %s", resource.String(), cluster)
					continue
				}
				resourcesByClusters[cluster.Name][resource] = multiNS
			}
		}
	}

	return ctl.store.UpdateCache(resourcesByClusters)
}

type errorHTTPHandler struct {
	requestInfo          *request.RequestInfo
	err                  error
	negotiatedSerializer runtime.NegotiatedSerializer
}

func (handler *errorHTTPHandler) ServeHTTP(delegate http.ResponseWriter, req *http.Request) {
	// Write error into delegate ResponseWriter, wrapped in metrics.InstrumentHandlerFunc, so metrics can record this error.
	gv := schema.GroupVersion{
		Group:   handler.requestInfo.APIGroup,
		Version: handler.requestInfo.Verb,
	}
	responsewriters.ErrorNegotiated(handler.err, handler.negotiatedSerializer, gv, delegate, req)
}

// Connect proxy and dispatch handlers
func (ctl *Controller) Connect(ctx context.Context, proxyPath string, responder rest.Responder) (http.Handler, error) {
	return http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
		newReq := req.Clone(req.Context())
		newReq.URL.Path = proxyPath
		requestInfo := lifted.NewRequestInfo(newReq)

		newCtx := request.WithRequestInfo(ctx, requestInfo)
		newCtx = request.WithNamespace(newCtx, requestInfo.Namespace)
		newReq = newReq.WithContext(newCtx)

		gvr := schema.GroupVersionResource{
			Group:    requestInfo.APIGroup,
			Version:  requestInfo.APIVersion,
			Resource: requestInfo.Resource,
		}

		h, err := ctl.proxy.Connect(newCtx, framework.ProxyRequest{
			RequestInfo:          requestInfo,
			GroupVersionResource: gvr,
			ProxyPath:            proxyPath,
			Responder:            responder,
			HTTPReq:              newReq,
		})

		if err != nil {
			h = &errorHTTPHandler{
				requestInfo:          requestInfo,
				err:                  err,
				negotiatedSerializer: ctl.negotiatedSerializer,
			}
		}

		h = metrics.InstrumentHandlerFunc(requestInfo.Verb, requestInfo.APIGroup, requestInfo.APIVersion, requestInfo.Resource, requestInfo.Subresource,
			"", "karmada-search", false, "", h.ServeHTTP)
		h.ServeHTTP(rw, newReq)
	}), nil
}

func dynamicClientForClusterFunc(clusterLister clusterlisters.ClusterLister,
	secretLister listcorev1.SecretLister) func(string) (dynamic.Interface, error) {
	clusterGetter := func(cluster string) (*clusterv1alpha1.Cluster, error) {
		return clusterLister.Get(cluster)
	}
	secretGetter := func(namespace string, name string) (*corev1.Secret, error) {
		return secretLister.Secrets(namespace).Get(name)
	}

	return func(clusterName string) (dynamic.Interface, error) {
		clusterConfig, err := util.BuildClusterConfig(clusterName, clusterGetter, secretGetter)
		if err != nil {
			return nil, err
		}
		return dynamic.NewForConfig(clusterConfig)
	}
}
